/*###################################################################
  > File Name: storage/replica/diskmd/diskmd_lost.c
  > Author: Vurtune
  > Mail: vurtune@foxmail.com
  > Created Time: Sat 24 Feb 2018 01:57:16 AM PST
###################################################################*/
#include "config.h"

#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <linux/fs.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <libgen.h>
#include <ctype.h>
#include <fcntl.h>
#include <libaio.h>
#include <limits.h>
#include <errno.h>
#include <sys/vfs.h>

#define DBG_SUBSYS S_LIBTASK

#include "lich_api.h"

#include "fnotify.h"
#include "tpool.h"
#include "configure.h"
#include "nodectl.h"
#include "diskmd_pool.h"
#include "disk_redis.h"
#include "recovery.h"
#include "diskmd_recovery.h"


/**
 * @todo 所在pool已被deleted
 *
 * @todo 一次提交过多chunk，rpc timeout
 * @todo 线程数小于卷数的情况，同步提交还是异步提交？
 * @todo controller在节点级平衡情况
 */

#define DISKMD_THP_TH_DFT  (20)
#define DISKMD_THP_TH_MIN  (10)
#define DISKMD_THP_TH_MAX  (30)
#define DISKMD_THP_TH_STEP (5)

#define DISKMD_REQ_MIN       (1)
#define DISKMD_REQ_MAX       (30)
#define DISKMD_REQ_COUNT     (1)

#define DISKMD_REDO_INTERVAL (3 * 60)

typedef struct {
        struct list_head hook;

        chkid_t id;
        uint64_t metaversion;
        int err_code;
} chunk_rec_t;

typedef struct __group_rec_t {
        struct list_head hook;

        sy_rwlock_t lock;
        chkid_t id;
        count_list_t chunk_lst;

        int refcount;

        int commit_max;
        int commit_count;

        int scan;
        int commit;
        int busy;

        int (*push)(struct __group_rec_t *group, struct list_head *pos, int busy);
        int (*pop)(struct __group_rec_t *group, count_list_t *wqueue);

        int (*update_commit)(struct __group_rec_t *group, int ret1);
} group_rec_t;

typedef enum {
        __DISK_REC_INIT__ = 0,
        __DISK_REC_SCAN__,
        __DISK_REC_ANALYSIS__,
        __DISK_REC_DONE__,
} disk_rec_status_t;

enum {
        __DISK_STATUS_INIT = 0,
        __DISK_STATUS_STOP
};

typedef struct __disk_rec_t {
        struct list_head hook;

        sy_rwlock_t lock;
        int refcount;

        int diskid;
        disk_rec_status_t status;
        int disk_status;
        int load_pool_count;

        uint64_t recovered;
        char pool[MAX_NAME_LEN];

        group_rec_t *ptr;
        count_list_t group_lst;

        count_list_t fail_lst;

        // int (*add_group)(struct __disk_rec_t *disk, const chkid_t *groupid);
        // int (*del_group)(struct __disk_rec_t *disk, const chkid_t *groupid);

        // int (*select_group)(struct __disk_rec_t *disk, group_rec_t **group);
        // int (*release_group)(struct __disk_rec_t *disk, group_rec_t *group);
} disk_rec_t;

typedef enum {
        __TH_STATUS_INIT__ = 0,
        __TH_STATUS_RUN__,
        __TH_STATUS_PAUSE__,
        __TH_STATUS_STOP__,
        __TH_STATUS_STOPPED__,
} thp_status_t;

typedef struct {
        int idx;
        thp_status_t status;
} diskmd_thp_seg_t;


typedef struct {
        sy_rwlock_t lock;

        int started;
        int thread_max;
        int thread_num;
        diskmd_thp_seg_t segs[DISKMD_THP_TH_MAX];

        timerange1_t range;

        int status;
        recovery_out_t out;
        // nodectl_option_t opt_switch;
        nodectl_option_t opt_commit_max;
} diskmd_rec_thp_t;

typedef struct {
        chkid_t id;
        nid_t nid;
} entry_t;

typedef struct {
        struct list_head hook;

        time_t last;
        int diskid;
} diskmd_rec_redo_t;

typedef struct {
        int thread_num;
        int count;
        int total;
} diskmd_rec_stat_t;

typedef struct {
        sy_rwlock_t lock;

        count_list_t disk_lst;
        diskmd_rec_thp_t thp;

        diskmd_rec_stat_t old_sample;
        diskmd_rec_stat_t new_sample;

        sy_spinlock_t redo_lock;
        count_list_t redo_lst;
        worker_handler_t redo_handler;

        token_bucket_t token_bucket;
} diskmd_rec_t;

static diskmd_rec_t *diskmd_rec_handler = NULL;

static int diskmd_rec_disk_add(diskmd_rec_t *handler, int diskid);
static int diskmd_rec_disk_del(diskmd_rec_t *handler, int diskid);

static int __disk_select(diskmd_rec_t *handler, int diskid, disk_rec_t **disk);
static int __disk_release(diskmd_rec_t *handler, disk_rec_t *disk);
static int __disk_detach(diskmd_rec_t *handler, int diskid, disk_rec_t **disk);
static int __disk_free(disk_rec_t *disk);

static int diskmd_rec_disk_scan(disk_rec_t *disk);
static int diskmd_rec_disk_analysis(disk_rec_t *disk);

static int diskmd_rec_disk_redo(int diskid);

static int diskmd_rec_group_add(disk_rec_t *disk, const chkid_t *groupid);
static int diskmd_rec_group_del(disk_rec_t *disk, group_rec_t **_group);

static int diskmd_rec_thp_th_start(int thread_num, int thread_max);
static int diskmd_rec_thp_th_stop();

static void diskmd_rec_thp_start();

void recovery_disk_dump(recovery_out_t *out, int new_status);

static inline void recovery_out_reset(recovery_out_t *out)
{
        out->status = __RECOVERY_WAITING__;
        out->lastscan = gettime();
        out->recovery = 0;
        out->lost = 0;
        out->offline = 0;
        out->success = 0;
        out->fail = 0;
        out->speed = 0;
}

static int __diskmd_rec_disk_rec_multi(const chkid_t *groupid, const chkid_t *chkids, int chk_count, int *retval);


#define DISKMD_THP "/opt/fusionstack/data/recovery/disk_thread"

int diskmd_rec_get_thread()
{
        int ret, _threads;
        char value[MAX_BUF_LEN], defvalue[MAX_BUF_LEN];

        YASSERT(DISKMD_THP_TH_MAX >= DISKMD_THP_TH_DFT);

        ret = _get_text(DISKMD_THP, value, MAX_BUF_LEN);
        if (ret < 0) {
                ret = -ret;
                sprintf(defvalue, "%d", DISKMD_THP_TH_DFT);
                ret = _set_text(DISKMD_THP, defvalue, strlen(defvalue) + 1, O_CREAT | O_TRUNC);
                if (unlikely(ret)) {
                        DWARN("set failed\n");
                }

                strcpy(value, defvalue);
        }

        _threads = atoi(value);
        if (unlikely(_threads > DISKMD_THP_TH_MAX))
                return DISKMD_THP_TH_MAX;
        else if (unlikely(_threads <= 0))
                return DISKMD_THP_TH_DFT;
        else
                return _threads;
}

static int __disk_select(diskmd_rec_t *handler, int diskid, disk_rec_t **disk)
{
        int ret, disk_count = 0;
        struct list_head *pos, *n;
        disk_rec_t *tmp;

        *disk = NULL;

        ret = sy_rwlock_wrlock(&handler->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &handler->disk_lst.list) {
                tmp = list_entry(pos, disk_rec_t, hook);

                if (likely(tmp->disk_status == __DISK_STATUS_INIT)) {
                        disk_count++;
                } else if (unlikely(tmp->disk_status == __DISK_STATUS_STOP)) {
                        if (tmp->refcount == 0) {
                                DWARN("disk %d %p\n", tmp->diskid, tmp);
                                count_list_del(pos, &handler->disk_lst);
                                __disk_free(tmp);
                        }
                }
        }

        if (unlikely(disk_count == 0)) {
                ret = ENOENT;
                goto err_lock;
        }

        if (diskid == -1) {
                list_for_each_safe(pos, n, &handler->disk_lst.list) {
                        tmp = list_entry(pos, disk_rec_t, hook);

                        if (likely(tmp->disk_status == __DISK_STATUS_INIT)) {
                                *disk = tmp;
                                (*disk)->refcount++;
                                break;
                        }
                }
        } else {
                YASSERT(0);
        }

        if (*disk == NULL) {
                ret = ENOENT;
                goto err_lock;
        }

        sy_rwlock_unlock(&handler->lock);
        return 0;
err_lock:
        sy_rwlock_unlock(&handler->lock);
err_ret:
        return ret;
}

static int __disk_release(diskmd_rec_t *handler, disk_rec_t *disk)
{
        int ret;

        ret = sy_rwlock_wrlock(&handler->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        disk->refcount--;

        sy_rwlock_unlock(&handler->lock);
        return 0;
err_ret:
        return ret;
}

static int __disk_detach(diskmd_rec_t *handler, int diskid, disk_rec_t **disk)
{
        int ret, found = 0;
        disk_rec_t *_disk = NULL;

        *disk = NULL;

        ret = sy_rwlock_wrlock(&handler->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        struct list_head *pos, *n;
        list_for_each_safe(pos, n, &handler->disk_lst.list) {
                _disk = (disk_rec_t *)pos;

                if (_disk->diskid == diskid) {
                        found = 1;
                        break;
                }
        }

        if (found) {
                YASSERT(_disk->refcount >= 0);

                if (_disk->refcount == 0) {
                        count_list_del(&_disk->hook, &handler->disk_lst);
                        *disk = _disk;
                } else {
                        DWARN("disk %d refcount %d\n", _disk->diskid, _disk->refcount);
                        ret = EBUSY;
                        GOTO(err_lock, ret);
                }
        } else {
                ret = ENOENT;
                GOTO(err_lock, ret);
        }

        sy_rwlock_unlock(&handler->lock);
        return 0;
err_lock:
        sy_rwlock_unlock(&handler->lock);
err_ret:
        return ret;
}

static int __disk_free(disk_rec_t *disk)
{
        struct list_head *pos, *n;
        struct list_head *pos2, *n2;
        group_rec_t *group;
        chunk_rec_t *chunk;

        if (disk) {
                DWARN("disk %d %p\n", disk->diskid, disk);

                list_for_each_safe(pos, n, &disk->group_lst.list) {
                        group = list_entry(pos, group_rec_t, hook);
                        count_list_del(pos, &disk->group_lst);

                        list_for_each_safe(pos2, n2, &group->chunk_lst.list) {
                                chunk = list_entry(pos2, chunk_rec_t, hook);
                                count_list_del(pos2, &group->chunk_lst);

                                yfree((void **)&chunk);
                        }

                        sy_rwlock_destroy(&group->lock);
                        yfree((void **)&group);
                }

                list_for_each_safe(pos, n, &disk->fail_lst.list) {
                        chunk = list_entry(pos, chunk_rec_t, hook);
                        count_list_del(pos, &disk->fail_lst);

                        yfree((void **)&chunk);
                }

                sy_rwlock_destroy(&disk->lock);
                yfree((void **)&disk);
        }

        return 0;
}

static int __group_push(group_rec_t *group, struct list_head *pos, int busy)
{
        int ret;

        ret = sy_rwlock_wrlock(&group->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        count_list_add_tail(pos, &group->chunk_lst);
        if (busy)
                group->busy ++;

        sy_rwlock_unlock(&group->lock);

        return 0;
}

static int __group_pop(group_rec_t *group, count_list_t *wqueue)
{
        int ret;
        struct list_head *pos, *n;

        // TODO when change commit_max danamically
        // YASSERT(group->commit_count <= group->commit_max);

        ret = sy_rwlock_wrlock(&group->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        list_for_each_safe(pos, n, &group->chunk_lst.list) {
                count_list_del(pos, &group->chunk_lst);
                count_list_add_tail(pos, wqueue);

                if (wqueue->count >= group->commit_count)
                        break;
        }

        sy_rwlock_unlock(&group->lock);

        return 0;
}

static int __group_update_commit(group_rec_t *group, int ret1)
{
        int ret;

        ret = sy_rwlock_wrlock(&group->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        if (ret1 == 0) {
                group->commit_count++;

                if (group->commit_count > group->commit_max)
                        group->commit_count = group->commit_max;
        } else if (ret1 == EBUSY ||
                   ret1 == ENONET ||
                   ret1 == ESTALE ||
                   ret1 == ETIMEDOUT ||
                   ret1 == EAGAIN) {
                group->commit_count = group->commit_count * 2 / 3;

                if (group->commit_count < DISKMD_REQ_MIN)
                        group->commit_count = DISKMD_REQ_MIN;
        } else {
                DWARN("group "CHKID_FORMAT", ret %d\n",
                      CHKID_ARG(&group->id), ret1);
        }

        sy_rwlock_unlock(&group->lock);

        return 0;
}

static int group_create(group_rec_t **group, const chkid_t *groupid)
{
        int ret;
        group_rec_t *_group;

        *group = NULL;

        ret = ymalloc((void **)&_group, sizeof(group_rec_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        sy_rwlock_init(&_group->lock, "diskmd_rec_group.lock");

        _group->id = *groupid;
        _group->refcount = 0;

        _group->commit_max = DISKMD_REQ_MAX;
        _group->commit_count = DISKMD_REQ_MIN;

        count_list_init(&_group->chunk_lst);

        _group->push = __group_push;
        _group->pop = __group_pop;
        _group->update_commit = __group_update_commit;

        *group = _group;

        return 0;
err_ret:
        return ret;
}

static int __disk_reactor(disk_rec_t *disk, int *_done)
{
        int ret, done = 0, diskid;
        diskmd_rec_t *handler = diskmd_rec_handler;
        nodectl_option_t *opt_commit_max = &handler->thp.opt_commit_max;

        diskid = disk->diskid;

        switch (disk->status) {
                case __DISK_REC_INIT__:
                        {
                                char path[MAX_NAME_LEN];

                                // TODO 依赖于相关服务，有可能获取失败？
                                // 1. if disk recovery finish and destroyed, then here is ENOENT
                                // 2. afterward, diskid is reused, this is another disk
                                ret = diskmd_get_pool(diskid, disk->pool);
                                if (unlikely(ret)) {
                                        DWARN("disk %d status %d count %d ret %d\n",
                                              diskid, disk->disk_status, disk->load_pool_count, ret);

                                        if (ret == ENOENT) {
                                                disk->load_pool_count++;
                                                if (disk->load_pool_count > 100) {
                                                        disk->disk_status = __DISK_STATUS_STOP;
                                                }
                                        }
                                        GOTO(err_ret, ret);
                                }

                                strcpy(handler->thp.out.pool_name, disk->pool);

#if 0
                                sprintf(path, "recovery/%s/disk_switch", disk->pool);
                                ret = nodectl_option_start(&handler->thp.opt_switch, path, "1");
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
#endif

                                sprintf(path, "recovery/%s/commit_max", disk->pool);
                                ret = opt_commit_max->start(opt_commit_max, path, "15", NULL);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                handler->thp.out.success_total = 0;
                        }

                        ret = diskmd_rec_disk_scan(disk);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        break;
                case __DISK_REC_SCAN__:
                case __DISK_REC_ANALYSIS__:
                        ret = diskmd_rec_disk_analysis(disk);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        break;
                case __DISK_REC_DONE__:
                        set_recovery_total(disk->pool, handler->thp.out.success_total, TRUE);
                        done = 1;
                        break;
        }

        *_done = done;
        return 0;
err_ret:
        *_done = done;
        return ret;
}

static void *diskmd_rec_th_cb(void *arg)
{
        int ret, done = 0, diskid;

        disk_rec_t *disk = NULL;
        diskmd_thp_seg_t *seg = arg;
        diskmd_rec_t *handler = diskmd_rec_handler;

        DBUG("diskmd_thp th %d started!\n", seg->idx);

        seg->status = __TH_STATUS_PAUSE__;

        while (1) {
                if (seg->status == __TH_STATUS_STOP__)
                        break;

                if (seg->status == __TH_STATUS_PAUSE__) {
                        usleep(1000 * 1000);
                        continue;
                }

                ret = __disk_select(handler, -1, &disk);
                if (unlikely(ret)) {
                        usleep(1000 * 1000);
                        continue;
                }

                diskid = disk->diskid;

                ANALYSIS_BEGIN(0);

                ret = __disk_reactor(disk, &done);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                usleep(1000 * 1000);
                        } else {
                                if (ret != EBUSY) {
                                        // DWARN("disk %d ret %d\n", disk->diskid, ret);
                                }
                                usleep(100 * 1000);
                        }
                }

                ANALYSIS_END(0, 500 * 1000, NULL);

                __disk_release(handler, disk);

                if (unlikely(done)) {
                        ret = diskmd_rec_disk_del(handler, diskid);
                        if (unlikely(ret)) {
                                usleep(1000 * 1000);
                        }
                }
        }

        seg->status = __TH_STATUS_STOPPED__;

        DINFO("thread %d stopped\n", seg->idx);
        return NULL;
}

static int diskmd_rec_thp_th_start(int thread_num, int thread_max)
{
        int i, ret;
        diskmd_rec_thp_t *thp = &diskmd_rec_handler->thp;

        YASSERT(thread_max <= DISKMD_THP_TH_MAX);
        YASSERT(thread_num <= thread_max);

        ret = sy_rwlock_wrlock(&thp->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (thp->started == 0) {
                thp->thread_max = thread_max;

                YASSERT(thp->thread_num == 0);
                YASSERT(thp->out.success == 0);
                YASSERT(thp->out.fail == 0);
                YASSERT(thp->range.p1.count == 0);
                YASSERT(thp->range.p2.count == 0);

                for (i = 0; i < thread_max; i ++) {
                        thp->segs[i].status = __TH_STATUS_INIT__;
                        ret = sy_thread_create(diskmd_rec_th_cb, &thp->segs[i]);
                        if (unlikely(ret)) {
                                YASSERT(0);
                                GOTO(err_unlock, ret);
                        }
                }

                for (i = 0; i < thread_max; i ++) {
                        while (1) {
                                if (thp->segs[i].status == __TH_STATUS_PAUSE__)
                                        break;
                                usleep(50 * 1000);
                        }
                }

                thp->started = 1;
        }

        YASSERT(thp->started);

        if (thread_num != thp->thread_num) {
                DINFO("thread_max %d thread_num %d -> %d\n",
                      thp->thread_max,
                      thp->thread_num,
                      thread_num);

                for (i = 0; i < thread_num; i++) {
                        thp->segs[i].status = __TH_STATUS_RUN__;
                }

                for (i = thread_num; i < thp->thread_max; i++) {
                        thp->segs[i].status = __TH_STATUS_PAUSE__;
                }

                thp->thread_num = thread_num;
        }

        sy_rwlock_unlock(&thp->lock);

        return 0;
err_unlock:
        sy_rwlock_unlock(&thp->lock);
err_ret:
        return ret;
}

static int diskmd_rec_thp_th_stop()
{
        int i, ret, done;
        diskmd_rec_thp_t *thp = &diskmd_rec_handler->thp;

        ret = sy_rwlock_wrlock(&thp->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (thp->started == 0) {
                sy_rwlock_unlock(&thp->lock);
                return 0;
        }

        DINFO("thread_max %d thread_num %d\n", thp->thread_max, thp->thread_num);

        for (i = 0; i < thp->thread_max; i ++) {
                thp->segs[i].status = __TH_STATUS_STOP__;
        }

        sy_rwlock_unlock(&thp->lock);

        while (1) {
                ret = sy_rwlock_rdlock(&thp->lock);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                done = 1;

                for (i = 0; i <thp->thread_max; i++) {
                        if (thp->segs[i].status != __TH_STATUS_STOPPED__) {
                                done = 0;
                                break;
                        }
                }

                sy_rwlock_unlock(&thp->lock);

                if (done) {
                        thp->thread_num = 0;
                        thp->started = 0;
                        break;
                } else  {
                        usleep(50 * 1000);

                        // TODO wait thread stop
                        if (timerange1_update(&thp->range, thp->out.success)) {
                                thp->out.speed = thp->range.speed;
                                recovery_disk_dump(&thp->out, -1);
                        }
                }
        }

        YASSERT(thp->thread_num == 0);

        return 0;
err_ret:
        return ret;
}

static inline const char *_recovery_disk_status2str(int status)
{
        if (status == __RECOVERY_RUNNING__)
                return "running";
        else if (status == __RECOVERY_WAITING__)
                return "waiting";
        else if (status == __RECOVERY_SCANNING__)
                return "scanning";
        else
                return "suspend";
}

void recovery_disk_dump(recovery_out_t *out, int new_status)
{
        char path[MAX_NAME_LEN];
        char value[MAX_BUF_LEN];
        diskmd_rec_t *handler = diskmd_rec_handler;

        if (new_status == __RECOVERY_WAITING__) {
                recovery_out_reset(out);
        }

        if (new_status != -1) {
                DINFO("pool %s recovery_status %s -> %s\n",
                      out->pool_name,
                      _recovery_disk_status2str(handler->thp.status),
                      _recovery_disk_status2str(new_status));

                handler->thp.status = new_status;
        }

        if (strlen(out->pool_name) == 0) {
                return;
        }

        snprintf(value, MAX_NAME_LEN,
                 "status:%s\n"
                         "recovery:%ju\n"
                         "check:0\n"
                         "success:%ju\n"
                         "success_total:%ju\n"
                         "fail:%ju\n"
                         "lost:%ju\n"
                         "offline:%ju\n"
                         "speed:%ju\n"
                         "lastscan:%u\n",
                 _recovery_disk_status2str(handler->thp.status),
                 out->recovery,
                 out->success,
                 out->success_total,
                 out->fail,
                 out->lost,
                 out->offline,
                 out->speed,
                 (int)out->lastscan);

        memset (path, 0, MAX_NAME_LEN);

        /**
         * @todo 当前，有两个恢复过程，分别处理节点故障和磁盘故障
         * 每个过程的输出，dump到不同的文件
         * 后续引入recovery_out，统一处理这两种情况
         * 简化与上层应用的交互方式
         */
        sprintf(path, RECOVERY"%s"RECOVERY_DISK_INFO, out->pool_name);

        nodectl_set(path, value);
}

static void *diskmd_rec_thp_th_monitor(void *arg)
{
        int ret, done = 0, thread_num;
        diskmd_rec_t *handler = diskmd_rec_handler;

        (void) arg;

        while (!is_recovery_started) {
                DWARN("recovery not started, wait...\n");
                usleep(2000 *1000);
        }

        DINFO("start thread %d\n", DISKMD_THP_TH_DFT);

        handler->new_sample.thread_num = DISKMD_THP_TH_DFT;

        while (!done) {
#if 0
                if (handler->thp.opt_switch.value == 0) {
                        usleep(1000 *1000);
                        DWARN("shutdown\n");
                        break;
                }
#endif

                done = 1;

                handler->thp.out.lastscan = gettime();

                recovery_disk_dump(&handler->thp.out, __RECOVERY_SCANNING__);

                ret = diskmd_rec_thp_th_start(handler->new_sample.thread_num, DISKMD_THP_TH_MAX);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                recovery_disk_dump(&handler->thp.out, __RECOVERY_RUNNING__);

                while (handler->disk_lst.count) {
                        usleep(1000 *1000);

                        if (timerange1_update(&handler->thp.range, handler->thp.out.success)) {
                                handler->thp.out.speed = handler->thp.range.speed;
                                recovery_disk_dump(&handler->thp.out, -1);
                        }

                        DINFO("pool_name %s disk %d thread %d speed %ju\n",
                              handler->thp.out.pool_name,
                              handler->disk_lst.count,
                              handler->thp.thread_num,
                              handler->thp.out.speed);

                        handler->new_sample.count++;
                        handler->new_sample.total += handler->thp.out.speed;

                        if (handler->new_sample.count > 30) {
                                thread_num = handler->new_sample.thread_num;

                                if (handler->old_sample.count == 0) {
                                        thread_num += DISKMD_THP_TH_STEP;
                                } else {
                                        YASSERT(handler->old_sample.count > 0 && handler->new_sample.count > 0);

                                        int old_speed = handler->old_sample.total / handler->old_sample.count;
                                        int new_speed = handler->new_sample.total / handler->new_sample.count;

                                        DINFO("prev thread %d %d current thread %d %d\n",
                                              handler->old_sample.thread_num,
                                              old_speed,
                                              handler->new_sample.thread_num,
                                              new_speed);

                                        if (old_speed < new_speed) {
                                                if (handler->old_sample.thread_num <= handler->new_sample.thread_num) {
                                                        thread_num += DISKMD_THP_TH_STEP;
                                                } else {
                                                        thread_num -= DISKMD_THP_TH_STEP;
                                                }
                                        } else {
                                                if (handler->old_sample.thread_num >= handler->new_sample.thread_num) {
                                                        thread_num += DISKMD_THP_TH_STEP;
                                                } else {
                                                        thread_num -= DISKMD_THP_TH_STEP;
                                                }
                                        }
                                }

                                itorange(&thread_num, DISKMD_THP_TH_MIN, handler->thp.thread_max);

                                handler->old_sample = handler->new_sample;

                                handler->new_sample.thread_num = thread_num;
                                handler->new_sample.count = 0;
                                handler->new_sample.total = 0;

                                if (handler->old_sample.thread_num != handler->new_sample.thread_num) {
                                        done = 0;
                                        break;
                                }
                        }

                        // load disk fill_rate
                }

        }

        ret = diskmd_rec_thp_th_stop();
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        set_recovery_total(handler->thp.out.pool_name, handler->thp.out.success_total, TRUE);

        recovery_disk_dump(&handler->thp.out, __RECOVERY_WAITING__);
        timerange1_init(&handler->thp.range, "diskmd_rec", 1000 * 1000);

        DINFO("stopped!\n");
        return NULL;
}

static void diskmd_rec_thp_start()
{
        int ret;
        diskmd_rec_thp_t *thp = &diskmd_rec_handler->thp;

        ret = sy_rwlock_wrlock(&thp->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        if (thp->started == 0) {
                ret = sy_thread_create2(diskmd_rec_thp_th_monitor, NULL, "diskmd_rec_thp_th_monitor");
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);
        }

        sy_rwlock_unlock(&thp->lock);
}

static int diskmd_rec_group_add(disk_rec_t *disk, const chkid_t *groupid)
{
        int ret;
        group_rec_t *group = NULL;
        struct list_head *pos, *n;

        ret = sy_rwlock_wrlock(&disk->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &disk->group_lst.list) {
                group = (group_rec_t *)pos;

                if (0 == chkid_cmp(groupid, &group->id)) {
                        goto out;
                }
        }

        ret = group_create(&group, groupid);
        if (unlikely(ret))
                GOTO(err_unlock, ret);

        count_list_add_tail(&group->hook, &disk->group_lst);

        DINFO("diskmd_rec disk %d add group "CHKID_FORMAT"\n", disk->diskid, CHKID_ARG(groupid));

out:
        sy_rwlock_unlock(&disk->lock);
        return 0;
err_unlock:
        sy_rwlock_unlock(&disk->lock);
err_ret:
        return ret;
}

static int diskmd_rec_group_del(disk_rec_t *disk, group_rec_t **_group)
{
        group_rec_t *group = *_group;

        if (disk->status == __DISK_REC_ANALYSIS__ && group->refcount == 0) {
                YASSERT(list_empty(&group->chunk_lst.list));

                DINFO("diskmd_rec disk %d del group "CHKID_FORMAT"\n", disk->diskid, CHKID_ARG(&group->id));
                if (disk->ptr == group)
                        disk->ptr = (group_rec_t *)disk->ptr->hook.next;

                count_list_del(&group->hook, &disk->group_lst);
                yfree((void **)&group);
        }

        return 0;
}

static int diskmd_rec_disk_add(diskmd_rec_t *handler, int diskid)
{
        int ret, found = 0;
        disk_rec_t *disk;
        struct list_head *pos, *n;

        ret = sy_rwlock_wrlock(&handler->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &handler->disk_lst.list) {
                disk = (disk_rec_t *)pos;
                if (disk->diskid == diskid)
                        found = 1;
        }

        if (!found) {
                ret = ymalloc((void **)&disk, sizeof(disk_rec_t));
                if (unlikely(ret))
                        GOTO(err_unlock, ret);

                disk->diskid = diskid;
                disk->recovered = 0;
                disk->status = __DISK_REC_INIT__;
                disk->refcount = 0;
                disk->disk_status = __DISK_STATUS_INIT;
                disk->load_pool_count = 0;

                ret = sy_rwlock_init(&disk->lock, "disk_rec.lock");
                if (unlikely(ret))
                        GOTO(err_free, ret);

                count_list_init(&disk->group_lst);
                count_list_init(&disk->fail_lst);
                disk->ptr = (group_rec_t *)disk->group_lst.list.next;

                count_list_add_tail(&disk->hook, &diskmd_rec_handler->disk_lst);

                DWARN("add disk %d count %d\n", diskid, diskmd_rec_handler->disk_lst.count);
        }

        sy_rwlock_unlock(&handler->lock);

        return 0;
err_free:
        yfree((void **)&disk);
err_unlock:
        sy_rwlock_unlock(&handler->lock);
err_ret:
        return ret;
}

static int diskmd_rec_disk_del(diskmd_rec_t *handler, int diskid)
{
        int ret, retval, retry;
        uint64_t chk_count;
        chkid_t groupid;
        disk_rec_t *disk = NULL;
        chunk_rec_t *chunk_rec;
        struct list_head *pos, *n;

        ret = __disk_detach(handler, diskid, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(disk && (disk->diskid == diskid));

        DINFO("retry disk %d\n", diskid);

        list_for_each_safe(pos, n, &disk->fail_lst.list) {
                chunk_rec = (chunk_rec_t *)pos;

                DINFO(""CHKID_FORMAT" retry, last ret %d\n",
                      CHKID_ARG(&chunk_rec->id), chunk_rec->err_code);

                count_list_del(pos, &disk->fail_lst);
                retry = 0;
retry:
                retval = 0;
                chkid2ctl(&chunk_rec->id, &groupid);
                ret = __diskmd_rec_disk_rec_multi(&groupid, &chunk_rec->id, 1, &retval);
                if (unlikely(ret || retval)) {
                        if (retry < 10 && ret != ENOSPC && retval != ENOSPC) {
                                DERROR(" "CHKID_FORMAT" recover retry %d still failed, ret %d\n",
                                       CHKID_ARG(&chunk_rec->id), retry, ret ? ret : retval);
                                usleep(100 *1000);
                                retry ++;
                                goto retry;
                        } else {
                                DERROR(" "CHKID_FORMAT" recover retry %d still failed, ret %d, give up....\n",
                                       CHKID_ARG(&chunk_rec->id), retry, ret);
                        }
                }

                yfree((void**)&chunk_rec);
        }

        ret = disk_maping->count(diskid, &chk_count);
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
        }

        if (unlikely(chk_count)) {
                diskmd_rec_disk_redo(diskid);
        }

        __disk_free(disk);
        return 0;
err_ret:
        return ret;
}

static int __diskmd_rec_pre_scan(const chkid_t *chkid, const char *_pool,
                                 const diskloc_t *_loc, const chkid_t *_parent,
                                 const uint64_t meta_version, void *_arg)
{
        int ret, retry = 0;

        (void) _arg;

        if (likely(chkid->type != __RAW_CHUNK__)) {
retry:
                ret = md_chunk_set(NULL, chkid, net_getnid(), __S_CHECK);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                goto out;
                        } else {
                                DWARN(""CHKID_FORMAT" set check failed, ret %d\n",
                                      CHKID_ARG(chkid), ret);
                                USLEEP_RETRY(err_ret, ret, retry, retry, 5 ,(100 * 1000));
                        }
                }
        }

out:
        return 0;
err_ret:
        return ret;
}

static void __diskmd_rec_scan__(const chkid_t *chkid, uint64_t meta_version, disk_rec_t *disk)
{
        int ret;
        chkid_t groupid;
        chunk_rec_t *chunk_rec;
        struct list_head *pos, *n;
        group_rec_t *group = NULL, *tmp = NULL;

        chkid2ctl(chkid, &groupid);

        ret = diskmd_rec_group_add(disk, &groupid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&chunk_rec, sizeof(chunk_rec_t));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        chunk_rec->id = *chkid;
        chunk_rec->metaversion = meta_version;
        chunk_rec->err_code = 0;

        DBUG(""CHKID_FORMAT" --> "CHKID_FORMAT"\n", CHKID_ARG(chkid), CHKID_ARG(&groupid));

        ret = sy_rwlock_rdlock(&disk->lock);
        if (unlikely(ret))
                GOTO(err_free, ret);

        {
                list_for_each_safe(pos, n, &disk->group_lst.list) {
                        tmp = (group_rec_t *)pos;

                        if (0 == chkid_cmp(&groupid, &tmp->id)) {
                                group = tmp;
                                break;
                        }
                }

                if (unlikely(!group))
                        YASSERT(0 && "not found!");

                ret = sy_rwlock_wrlock(&group->lock);
                if (unlikely(ret))
                        GOTO(err_unlock, ret);

                group->scan ++;
                count_list_add_tail(&chunk_rec->hook, &group->chunk_lst);

                DBUG("disk %d group count %u, "CHKID_FORMAT" chunk count %u\n",
                     disk->diskid, disk->group_lst.count,
                     CHKID_ARG(&group->id), group->chunk_lst.count);

                diskmd_rec_handler->thp.out.recovery++;
                diskmd_rec_handler->thp.out.offline++;

                sy_rwlock_unlock(&group->lock);
        }

        sy_rwlock_unlock(&disk->lock);

        return;
err_unlock:
        sy_rwlock_unlock(&disk->lock);
err_free:
        yfree((void **)&chunk_rec);
err_ret:
        return;
}

static int __diskmd_rec_scan(const chkid_t *chkid, const char *_pool,
                                    const diskloc_t *_loc, const chkid_t *_parent,
                                    const uint64_t meta_version, void *_arg)
{
        disk_rec_t *disk = _arg;

        __diskmd_rec_scan__(chkid, meta_version, disk);
        return 0;
}

void diskmd_rec_disk_next_state(disk_rec_t *disk, disk_rec_status_t status) {
        int ret;

        ret = sy_rwlock_wrlock(&disk->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        YASSERT(status == disk->status + 1);
        disk->status = status;

        sy_rwlock_unlock(&disk->lock);
}

/* scan */
static int diskmd_rec_disk_scan(disk_rec_t *disk)
{
        int ret;

        ret = sy_rwlock_wrlock(&disk->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (disk->status != __DISK_REC_INIT__) {
                sy_rwlock_unlock(&disk->lock);
                return 0;
        }

        disk->status = __DISK_REC_SCAN__;
        sy_rwlock_unlock(&disk->lock);
        disk_maping->iterator_bydisk(disk->diskid, __diskmd_rec_pre_scan, disk, DM_FLAG_MD);
        disk_maping->iterator_bydisk(disk->diskid, __diskmd_rec_scan, disk, DM_FLAG_MD);
        disk_maping->iterator_bydisk(disk->diskid, __diskmd_rec_scan, disk, DM_FLAG_RAW);

        diskmd_rec_disk_next_state(disk, __DISK_REC_ANALYSIS__);

        return 0;
err_ret:
        return ret;
}

static int diskmd_rec_disk_group_select(disk_rec_t *disk, group_rec_t **group)
{
        int ret;

        ret = sy_rwlock_wrlock(&disk->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *group = NULL;
        if (list_empty(&disk->group_lst.list)) {
                DBUG("disk %d group select empty!\n", disk->diskid);
                if (disk->status == __DISK_REC_ANALYSIS__) {
                        disk->status = __DISK_REC_DONE__;
                        DINFO("disk %d rec set done\n", disk->diskid);
                }
                ret = ENOENT;
                GOTO(err_unlock, ret);
        }

        if ((struct list_head *)(disk->ptr) == &disk->group_lst.list)
                disk->ptr = (group_rec_t *)disk->group_lst.list.next;

        if ((struct list_head *)(disk->ptr) != &disk->group_lst.list) {
                *group = disk->ptr;
                disk->ptr = (group_rec_t *)disk->ptr->hook.next;
        }

        if (list_empty(&(*group)->chunk_lst.list)) {
                diskmd_rec_group_del(disk, group);
                ret = EAGAIN;
                GOTO(err_unlock, ret);
        }

        (*group)->refcount++;

        sy_rwlock_unlock(&disk->lock);

        return 0;
err_unlock:
        sy_rwlock_unlock(&disk->lock);
err_ret:
        return ret;
}


static int diskmd_rec_disk_group_release(disk_rec_t *disk, group_rec_t **group)
{
        int ret;

        ret = sy_rwlock_wrlock(&disk->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        (*group)->refcount--;

        if (list_empty(&(*group)->chunk_lst.list)) {
                diskmd_rec_group_del(disk, group);
        }

        sy_rwlock_unlock(&disk->lock);

        return 0;
}

static int __diskmd_rec_disk_rec_multi(const chkid_t *groupid, const chkid_t *chkids, int chk_count, int *retval)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(groupid, &nid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = stor_rpc_chunk_check_multi(&nid, groupid, chkids, chk_count, retval);
        if (unlikely(ret)) {
                if (retry < 1) {
                        if (ret == EREMCHG) {
                                md_map_drop(groupid, &nid);
                                retry++;
                                goto retry;
                        }
                }

                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int diskmd_rec_disk_rec_multi(const chkid_t *groupid, count_list_t *wqueue, int *retval)
{
        int ret, chk_count = 0;
        chkid_t *chkid;
        chunk_rec_t *tmp;
        struct list_head *pos, *n;

        ret = ymalloc((void **)&chkid, sizeof(chkid_t) * wqueue->count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &wqueue->list) {
                tmp = (chunk_rec_t *)pos;
                chkid[chk_count] = tmp->id;
                chk_count ++;
        }

        YASSERT(chk_count == wqueue->count);

        ANALYSIS_BEGIN(0);

        ret = __diskmd_rec_disk_rec_multi(groupid, chkid, chk_count, retval);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ANALYSIS_END(0, 1000 * 1000, "diskmd_rec_multi");

        yfree((void **)&chkid);

        return 0;
err_free:
        yfree((void **)&chkid);
err_ret:
        return ret;
}

static int diskmd_rec_disk_analysis(disk_rec_t *disk)
{
        int ret, ret1 = 0, idx, commit_count;
        int *retval;
        group_rec_t *group;
        chunk_rec_t *chunk;
        struct list_head *pos, *n;
        static uint64_t __call_count__ = 0;

        diskmd_rec_t *handler = diskmd_rec_handler;

        if (__call_count__ % 100 == 0) {
                common_load_fill_rate(disk->pool, &handler->token_bucket, 1);
        }
        __call_count__++;

        ret = diskmd_rec_disk_group_select(disk, &group);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        itorange(&handler->thp.opt_commit_max.value, DISKMD_REQ_MIN, DISKMD_REQ_MAX);
        group->commit_max = handler->thp.opt_commit_max.value;
        itorange(&group->commit_count, DISKMD_REQ_MIN, group->commit_max);

        commit_count = _min(group->commit_count, group->chunk_lst.count);

        ret = token_bucket_consume_loop(&handler->token_bucket, commit_count, 100, 10000);
        if (unlikely(ret)) {
                GOTO(err_release, ret);
        }

        DBUG("select group "CHKID_FORMAT"\n", CHKID_ARG(&group->id));

        chkid_t groupid;
        count_list_t wqueue;

        groupid = group->id;
        count_list_init(&wqueue);

        ret = group->pop(group, &wqueue);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        if (wqueue.count) {
                group->commit += wqueue.count;

                ret = ymalloc((void **)&retval, wqueue.count * sizeof(int));
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                ret1 = diskmd_rec_disk_rec_multi(&groupid, &wqueue, retval);

                {
                        ret = group->update_commit(group, ret1);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);
                }

                idx = 0;
                list_for_each_safe(pos, n, &wqueue.list) {
                        chunk = (chunk_rec_t *)pos;

                        count_list_del(pos, &wqueue);

                        if (unlikely(ret1 || retval[idx])) {

                                chunk->err_code = ret1 ? ret1 : retval[idx];

                                if (ret1 != EBUSY && ret1 != ENOENT) {
                                        // TODO ret1 == 110
                                        DWARN("chunk check "CHKID_FORMAT" multi ret %d ret:%d\n",
                                              CHKID_ARG(&chunk->id),
                                              ret1,
                                              retval[idx]);
                                }

                                if (chunk->err_code == EBUSY ||
                                    chunk->err_code == ENONET ||
                                    chunk->err_code == ESTALE ||
                                    chunk->err_code == ETIMEDOUT ||
                                    chunk->err_code == EAGAIN) {
                                        ret = group->push(group, pos, TRUE);
                                        if (unlikely(ret))
                                                UNIMPLEMENTED(__DUMP__);

                                        usleep(100 * 1000);
                                } else if (chunk->err_code == ENOENT) {
                                        DWARN("chunk "CHKID_FORMAT" meta %ju\n",
                                              CHKID_ARG(&chunk->id), chunk->metaversion);

                                        replica_srv_unlink(&chunk->id, chunk->metaversion);
                                } else {
                                        ret = sy_rwlock_wrlock(&disk->lock);
                                        if (unlikely(ret))
                                                UNIMPLEMENTED(__DUMP__);

                                        count_list_add_tail(pos, &disk->fail_lst);

                                        handler->thp.out.fail++;

                                        sy_rwlock_unlock(&disk->lock);
                                }
                        } else {
                                yfree((void **)&chunk);

                                ret = sy_rwlock_wrlock(&handler->lock);
                                if (unlikely(ret))
                                        UNIMPLEMENTED(__DUMP__);

                                handler->thp.out.success++;
                                handler->thp.out.success_total++;

                                if ((handler->thp.out.success_total % 1000) == 0) {
                                        set_recovery_total(disk->pool, handler->thp.out.success_total, TRUE);
                                }

                                sy_rwlock_unlock(&handler->lock);
                        }

                        idx ++;
                }
        }

        DINFO("%s/"CHKID_FORMAT" disk %d/%d/%d th %d submit %d/%d sp %ju - scan %d busy %d commit %d list %u ret %d\n",
              handler->thp.out.pool_name,
              CHKID_ARG(&group->id),
              handler->disk_lst.count,
              disk->diskid,
              disk->refcount,
              handler->thp.thread_num,
              group->commit_max,
              group->commit_count,
              handler->thp.out.speed,
              group->scan,
              group->busy,
              group->commit,
              group->chunk_lst.count,
              ret1);

        if (unlikely(ret1)) {
                ret = ret1;
                GOTO(err_release, ret);
        }

        diskmd_rec_disk_group_release(disk, &group);
        return 0;
err_release:
        diskmd_rec_disk_group_release(disk, &group);
err_ret:
        return ret;
}

static int diskmd_rec_disk_redo(int diskid)
{
        int ret, found = 0;
        struct list_head *pos, *n;
        diskmd_rec_redo_t *redo_seg;

        diskmd_rec_t *handler = diskmd_rec_handler;

        ret = sy_spin_lock(&handler->redo_lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &handler->redo_lst.list) {
                redo_seg = (diskmd_rec_redo_t *)pos;

                if (redo_seg->diskid == diskid)
                        found = 1;
        }

        if (!found) {
                ret = ymalloc((void **)&redo_seg, sizeof(diskmd_rec_redo_t));
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                redo_seg->diskid = diskid;
                redo_seg->last = gettime();

                count_list_add_tail(&redo_seg->hook, &handler->redo_lst);
                DINFO("diskmd_rec redo add disk %d, total %u\n",
                                redo_seg->diskid, handler->redo_lst.count);
        }

        sy_spin_unlock(&handler->redo_lock);

        return 0;
err_ret:
        return ret;
}

static int __diskmd_redo_cb__(void *args)
{
        int ret;
        struct list_head *pos, *n;
        diskmd_rec_redo_t *redo_seg;

        (void) args;

        if (!diskmd_rec_handler) {
                DWARN("diskmd_rec_handler not init\n");
                return 0;
        }

        diskmd_rec_t *handler = diskmd_rec_handler;

        ret = sy_spin_lock(&handler->redo_lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &handler->redo_lst.list) {
                redo_seg = (diskmd_rec_redo_t *)pos;

                if (gettime() - redo_seg->last >= DISKMD_REDO_INTERVAL) {
                        DINFO("redo disk %d\n", redo_seg->diskid);

                        ret = diskmd_recover_disk(redo_seg->diskid);
                        if (unlikely(ret)) {
                                DWARN("redo add failed, ret %d\n", ret);
                        } else {
                                count_list_del(pos, &handler->redo_lst);
                                yfree((void **)&redo_seg);
                        }
                }
        }

        sy_spin_unlock(&handler->redo_lock);

        ret = timer1_settime(&handler->redo_handler, USEC_PER_SEC);
        if (unlikely(ret)) {
                YASSERT(0 && "set failed, why");
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

/**
 * @todo by pool
 *
 * @param pool
 * @return
 */
int diskmd_recovery_check(const char *pool)
{
        diskmd_rec_t *handler = diskmd_rec_handler;
        if (handler == NULL) {
                DWARN("diskmd_recovery not ready\n");
                return EPERM;
        }

        if (handler->thp.status == __RECOVERY_WAITING__) {
                char key[MAX_NAME_LEN];
                sprintf(key, RECOVERY"%s"RECOVERY_DISK_INFO, pool);
                nodectl_unlink(key);
        }

        return 0;
}

static int __diskmd_get_chunkinfo__(const chkid_t *chkid, const char *pool, const diskloc_t *_loc,
                         const chkid_t *_parent, const uint64_t meta_version, void *ctx)
{
        int ret;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        char buf[MAX_BUF_LEN];
        //fileid_t parentid = NULL;

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo1(pool,NULL, chkid, chkinfo, NULL);
        if (ret)
                GOTO(err_ret, ret);

        //CHKINFO_DUMP(chkinfo, D_INFO);
        CHKINFO_STR(chkinfo, buf);
        printf("chkinfo: %s\n", buf);

        return 0;
err_ret:
        return ret;
}

/* scan */
int diskmd_get_disk_chunkinfo(int disk)
{
        int ret = 0;
        char path[MAX_PATH_LEN];

        if (gloconf.kv_redis) {
                UNIMPLEMENTED(__WARN__);
                (void) disk;
                return 0;
        } else {
                UNIMPLEMENTED(__DUMP__);
                
                snprintf(path, MAX_PATH_LEN, "/opt/fusionstack/data/chunk");
                ret = disk_maping->init(path);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                disk_maping->iterator_bydisk(disk, __diskmd_get_chunkinfo__, &disk,
                                             DM_FLAG_RAW | DM_FLAG_MD);
        }

        return 0;
err_ret:
        return ret;
}

/*
 *　磁盘故障，盘上数据只能减少　不能增加
 */
int diskmd_recover_disk(int diskid)
{
        int ret;

        DWARN("disk %d\n", diskid);

        if (gloconf.background_recovery == 0) {
                DERROR("recovery disabled\n");
                return 0;
        }

        ret = diskmd_rec_disk_add(diskmd_rec_handler, diskid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        diskmd_rec_thp_start();

        return 0;
err_ret:
        return ret;
}

/**
 * 1. when pool is deleted, stop recovery tasks
 * 2. when disk recovery finish
 *
 */
int diskmd_recovery_stop_disk(int diskid)
{
        int ret;
        diskmd_rec_t *handler = diskmd_rec_handler;
        struct list_head *pos, *n;
        disk_rec_t *disk;

        DINFO("disk %d\n", diskid);

        ret = sy_rwlock_wrlock(&handler->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &handler->disk_lst.list) {
                disk = list_entry(pos, disk_rec_t, hook);

                if (disk->diskid == diskid && disk->disk_status == __DISK_STATUS_INIT) {
                        DWARN("stop disk %d\n", diskid);
                        disk->disk_status = __DISK_STATUS_STOP;
                }
        }

        sy_rwlock_unlock(&handler->lock);
        return 0;
err_ret:
        return ret;
}

int diskmd_recovery_is_stopped(int diskid, int *stop)
{
        int ret;
        diskmd_rec_t *handler = diskmd_rec_handler;
        struct list_head *pos, *n;
        disk_rec_t *disk;

        *stop = 1;

        ret = sy_rwlock_rdlock(&handler->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &handler->disk_lst.list) {
                disk = list_entry(pos, disk_rec_t, hook);
                if (disk->diskid == diskid) {
                        *stop = 0;
                }
        }

        sy_rwlock_unlock(&handler->lock);

        return 0;
err_ret:
        return ret;
}

int diskmd_recovery_is_running()
{
        int is_running = 0;

        diskmd_rec_t *handler = diskmd_rec_handler;

        if (handler->thp.started && handler->thp.out.status != __RECOVERY_WAITING__) {
                is_running = 1;
        }

        return is_running;
}

int diskmd_recover_init()
{
        int i, ret;
        diskmd_rec_t *handler;

        YASSERT(diskmd_rec_handler == NULL);

        ret = ymalloc((void **)&handler, sizeof(diskmd_rec_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_rwlock_init(&handler->lock, "diskmd_rec.lock");
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = sy_spin_init(&handler->redo_lock);
        if (unlikely(ret))
                GOTO(err_free, ret);

        count_list_init(&handler->disk_lst);
        count_list_init(&handler->redo_lst);

        ret = sy_rwlock_init(&handler->thp.lock, "diskmd_rec_thp.lock");
        if (unlikely(ret))
                GOTO(err_free, ret);

        handler->thp.started = 0;
        handler->thp.thread_num = 0;
        handler->thp.status = __RECOVERY_WAITING__;

        memset(handler->thp.out.pool_name, 0, sizeof(handler->thp.out.pool_name));

        recovery_out_reset(&handler->thp.out);
        timerange1_init(&handler->thp.range, "diskmd_rec", 1000 * 1000);

        for (i = 0; i < DISKMD_THP_TH_MAX; i++) {
                handler->thp.segs[i].idx = i;
                handler->thp.segs[i].status = __TH_STATUS_STOPPED__;
        }

        // nodectl_option_init(&handler->thp.opt_switch);
        opt_nodectl_init2(&handler->thp.opt_commit_max);

        token_bucket_init(&handler->token_bucket, "diskmd_recovery", 0, 0, 0, 0, 0);

        // if not exist, set default
        // diskmd_rec_get_thread();

        ret = timer1_create(&handler->redo_handler, "diskmd_rec.redo", __diskmd_redo_cb__, NULL);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = timer1_settime(&handler->redo_handler, USEC_PER_SEC);
        if (unlikely(ret))
                GOTO(err_free, ret);

        diskmd_rec_handler = handler;

        return 0;
err_free:
        yfree((void **)&handler);
err_ret:
        return ret;
}
